package xdelay

import (
	"context"
	"encoding/json"
	"fmt"
	"gitee.com/go-mid/infra/xmq/mqconf"
	"net/http"
	"strings"
	"time"

	"gitee.com/go-mid/infra/xhttp"
	"gitee.com/go-mid/infra/xtime"
	"gitee.com/go-mid/infra/xtrace"
)

const (
	defaultToken = "01E0SSK0DJ9XX4PDFJCD3DN7WX"

	// copy from gitlab.pri.ibanyu.com/middleware/delayqueue/model/errors.go
	NotFoundErrMsg = "job not found"
)

type DelayHandler struct {
	cli   *DelayClient
	jobID string
}

func NewDelayHandler(cli *DelayClient, jobID string) *DelayHandler {
	return &DelayHandler{
		cli:   cli,
		jobID: jobID,
	}
}

func (p *DelayHandler) Ack(ctx context.Context) error {
	return p.cli.Ack(ctx, p.jobID)
}

// 延迟队列客户端
type DelayClient struct {
	endpoint string

	httpCli *xhttp.HttpClientWrapper

	namespace string
	queue     string

	ttlSeconds uint32
	tries      uint16
	ttrSeconds uint32

	requestInterval time.Duration
}

// 延迟队列任务
type Job struct {
	Namespace string `json:"namespace"`
	Queue     string `json:"queue"`
	Body      []byte `json:"body"` // 任务具体实体
	ID        string `json:"id"`
	TTL       uint32 `json:"ttl"`        // 任务过期时间 单位：s
	Delay     uint32 `json:"delay"`      // 任务延迟时间 单位：s
	ElapsedMS int64  `json:"elapsed_ms"` // 任务从产生到消费时间 单位：ms
}

type writeRes struct {
	Ret  int    `json:"ret"`
	Msg  string `json:"msg,omitempty"`
	Data struct {
		Ent publishResult `json:"ent"`
	} `json:"data,omitempty"`
}

type readRes struct {
	Ret  int    `json:"ret"`
	Msg  string `json:"msg,omitempty"`
	Data struct {
		Ent struct {
			Job *Job `json:"job"`
		} `json:"ent"`
	} `json:"data,omitempty"`
}

type ackRes struct {
	Ret  int      `json:"ret"`
	Msg  string   `json:"msg,omitempty"`
	Data struct{} `json:"data,omitempty"`
}

type publishResult struct {
	JobID string `json:"job_id"`
}

type publishRequest struct {
	Queue        string `json:"queue"`         // 队列
	Body         []byte `json:"body"`          // 任务实体
	TTLSeconds   uint32 `json:"ttl_seconds"`   // 任务过期时间
	DelaySeconds uint32 `json:"delay_seconds"` // 延迟时间
	Tries        uint16 `json:"tries"`         // 重试次数
}

type consumeRequest struct {
	Queue      string `json:"queue"`
	TTRSeconds uint32 `json:"ttr_seconds"` // 任务运行时间
}

type deleteJobRequest struct {
	Queue string `json:"queue"`
	JobID string `json:"job_id"`
}
type DelayMqConfig struct {
	mqconf.MQConfig
	Endpoint        string `json:"endpoint"`
	Queue           string `json:"queue"`
	TTLSeconds      uint32 `json:"ttl_seconds"`
	TTRSeconds      uint32 `json:"ttr_seconds"`
	Tries           uint16 `json:"tries"`
	RequestInterval int64  `json:"request_interval"`
}

func NewDelayClient(conf DelayMqConfig) *DelayClient {
	httpClientWrapper := createDefaultHttpClientWrapper()
	return &DelayClient{
		httpCli:         httpClientWrapper,
		endpoint:        conf.Endpoint,
		namespace:       conf.Namespace,
		queue:           conf.Queue,
		ttlSeconds:      conf.TTLSeconds,
		ttrSeconds:      conf.TTRSeconds,
		tries:           conf.Tries,
		requestInterval: time.Duration(conf.RequestInterval) * time.Second,
	}
}

// Write 发布任务
// Deprecated: use DelayClient.WriteJob instead
func (p *DelayClient) Write(ctx context.Context, value interface{}, ttlSeconds, delaySeconds uint32, tries uint16) (jobID string, err error) {
	fun := "DelayClient.Write --> "
	span := xtrace.SpanFromContext(ctx)
	if span != nil {
		p.setProducerDelaySpanTags(span)
	}

	msg, err1 := json.Marshal(value)
	if err1 != nil {
		err = fmt.Errorf("%s json marshal, value = %v", err1, value)
		return
	}
	res := new(writeRes)
	req := &publishRequest{
		Queue:        p.queue,
		Body:         msg,
		TTLSeconds:   ttlSeconds,
		DelaySeconds: delaySeconds,
		Tries:        tries,
	}
	path := fmt.Sprintf("/base/delayqueue/%s/job/publish", p.namespace)

	st := xtime.NewTimeStat()
	defer func() {
		mqconf.StatReqDuration(ctx, p.getTopic(), "DelayClient.Write", mqconf.TraceMessageBusTypeDelay, st.Millisecond())
	}()
	err = p.httpInvoke(ctx, path, req, res)
	if err != nil {
		return
	}
	if res.Ret == -1 {
		err = fmt.Errorf("%s http invoke, path = %s, err = %s", fun, path, res.Msg)
		return
	}
	jobID = res.Data.Ent.JobID
	return
}

// WriteJob 发布任务
func (p *DelayClient) WriteJob(ctx context.Context, value interface{}, delaySeconds uint32) (jobID string, err error) {
	fun := "DelayClient.WriteJob --> "
	span := xtrace.SpanFromContext(ctx)
	if span != nil {
		p.setProducerDelaySpanTags(span)
	}

	msg, err1 := json.Marshal(value)
	if err1 != nil {
		err = fmt.Errorf("%s json marshal, value = %v", err1, value)
		return
	}
	res := new(writeRes)
	req := &publishRequest{
		Queue:        p.queue,
		Body:         msg,
		TTLSeconds:   p.ttlSeconds,
		DelaySeconds: delaySeconds,
		Tries:        p.tries,
	}
	path := fmt.Sprintf("/base/delayqueue/%s/job/publish", p.namespace)

	st := xtime.NewTimeStat()
	defer func() {
		mqconf.StatReqDuration(ctx, p.getTopic(), "DelayClient.Write", mqconf.TraceMessageBusTypeDelay, st.Millisecond())
	}()
	err = p.httpInvoke(ctx, path, req, res)
	if err != nil {
		return
	}
	if res.Ret == -1 {
		err = fmt.Errorf("%s http invoke, path = %s, err = %s", fun, path, res.Msg)
		return
	}
	jobID = res.Data.Ent.JobID
	return
}

// Read 消费任务
// Deprecated: use DelayClient.ReadJob instead
func (p *DelayClient) Read(ctx context.Context, ttrSeconds uint32) (job *Job, err error) {
	fun := "DelayClient.Read -->"
	span := xtrace.SpanFromContext(ctx)
	if span != nil {
		p.setConsumerDelaySpanTags(span)
	}
	res := new(readRes)
	req := &consumeRequest{
		Queue:      p.queue,
		TTRSeconds: ttrSeconds,
	}
	path := fmt.Sprintf("/base/delayqueue/%s/job/consume", p.namespace)
	for {
		time.Sleep(p.requestInterval)
		st := xtime.NewTimeStat()
		err = p.httpInvoke(ctx, path, req, res)
		mqconf.StatReqDuration(ctx, p.getTopic(), "DelayClient.Read", mqconf.TraceMessageBusTypeDelay, st.Millisecond())
		if err != nil {
			break
		}
		if res.Msg == NotFoundErrMsg {
			continue
		}
		if res.Ret == -1 {
			err = fmt.Errorf("%s httpInvoke, path = %s, err = %s", fun, path, res.Msg)
			break
		}
		if res.Data.Ent.Job == nil {
			continue
		}
		job = res.Data.Ent.Job
		break
	}
	return
}

// ReadJob 消费任务
func (p *DelayClient) ReadJob(ctx context.Context) (job *Job, err error) {
	fun := "DelayClient.ReadJob -->"
	span := xtrace.SpanFromContext(ctx)
	if span != nil {
		p.setConsumerDelaySpanTags(span)
	}
	res := new(readRes)
	req := &consumeRequest{
		Queue:      p.queue,
		TTRSeconds: p.ttrSeconds,
	}
	path := fmt.Sprintf("/base/delayqueue/%s/job/consume", p.namespace)
	for {
		time.Sleep(p.requestInterval)
		st := xtime.NewTimeStat()
		err = p.httpInvoke(ctx, path, req, res)
		mqconf.StatReqDuration(ctx, p.getTopic(), "DelayClient.Read", mqconf.TraceMessageBusTypeDelay, st.Millisecond())
		if err != nil {
			break
		}
		if res.Msg == NotFoundErrMsg {
			continue
		}
		if res.Ret == -1 {
			err = fmt.Errorf("%s httpInvoke, path = %s, err = %s", fun, path, res.Msg)
			break
		}
		if res.Data.Ent.Job == nil {
			continue
		}
		job = res.Data.Ent.Job
		break
	}
	return
}

// Ack 确认消费
func (p *DelayClient) Ack(ctx context.Context, jobID string) error {
	fun := "DelayClient.Ack -->"
	span := xtrace.SpanFromContext(ctx)
	if span != nil {
		p.setConsumerDelaySpanTags(span)
	}

	res := new(ackRes)
	req := &deleteJobRequest{
		Queue: p.queue,
		JobID: jobID,
	}
	path := fmt.Sprintf("/base/delayqueue/%s/job/delete", p.namespace)
	err := p.httpInvoke(ctx, path, req, res)
	if err != nil {
		return err
	}
	if res.Ret == -1 {
		return fmt.Errorf("%s http invoke, path = %s, err = %s", fun, path, res.Msg)
	}
	return nil
}

func (p *DelayClient) httpInvoke(ctx context.Context, path string, req interface{}, res interface{}) error {
	url := fmt.Sprintf("%s%s?token=%s", p.endpoint, path, defaultToken)
	data, err := json.Marshal(req)
	if err != nil {
		return err
	}
	resData, code, err := p.httpCli.HttpReqPost(url, data, time.Minute)
	if err != nil {
		return err
	}
	if code != http.StatusOK {
		return fmt.Errorf("http request, url = %s, code = %d, data = %s", url, code, string(resData))
	}
	err = json.Unmarshal(resData, &res)
	return err
}

func (p *DelayClient) setProducerDelaySpanTags(span xtrace.Span) {
	span.SetTag(xtrace.TagSpanKind, xtrace.SpanKindProducer)
	p.setDelaySpanTags(span)
}

func (p *DelayClient) setConsumerDelaySpanTags(span xtrace.Span) {
	span.SetTag(xtrace.TagSpanKind, xtrace.SpanKindConsumer)
	p.setDelaySpanTags(span)
}

func (p *DelayClient) setDelaySpanTags(span xtrace.Span) {
	span.SetTag(xtrace.TagComponent, mqconf.TraceComponent)
	span.SetTag(xtrace.TagPalfishMessageBusType, mqconf.TraceMessageBusTypeDelay)
	span.SetTag(xtrace.TagMessageBusDestination, p.getTopic())

	span.SetTag(xtrace.TagMessagingDestinationKind, xtrace.MessagingDestinationKindQueue)
	span.SetTag(xtrace.TagMessagingDestination, p.getTopic())
}

func (p *DelayClient) getTopic() string {
	return fmt.Sprintf("%s.%s", p.namespace, p.queue)
}

// topic : group.service.module ==>  namespace: group.service queue: module
func parseTopic(topic string) (namespace, queue string, err error) {
	index := strings.LastIndex(topic, ".")
	if index == -1 {
		err = fmt.Errorf("topic format, topic = %s", topic)
		return
	}
	namespace = topic[:index]
	queue = topic[index+1:]
	return
}

func createDefaultHttpClientWrapper() *xhttp.HttpClientWrapper {
	httpClient := &http.Client{
		Transport: &http.Transport{
			MaxIdleConnsPerHost: 128,
			MaxConnsPerHost:     1024,
			IdleConnTimeout:     600 * time.Second,
		},
		Timeout: 0,
	}
	return xhttp.NewHttpClientWrapper(httpClient)
}
